/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.util;

import java.io.File;
import java.io.Reader;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.util.csv.CsvUpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;

/***
 * Upserts CSV data using Phoenix JDBC connection
 */
public class CSVCommonsLoader {

  private static final Logger LOGGER = LoggerFactory.getLogger(CSVCommonsLoader.class);

  public static final String DEFAULT_ARRAY_ELEMENT_SEPARATOR = ":";

  private static final Map<Character, Character> CTRL_CHARACTER_TABLE =
    ImmutableMap.<Character, Character> builder().put('1', '\u0001').put('2', '\u0002')
      .put('3', '\u0003').put('4', '\u0004').put('5', '\u0005').put('6', '\u0006')
      .put('7', '\u0007').put('8', '\u0008').put('9', '\u0009').build();

  private final PhoenixConnection conn;
  private final String tableName;
  private final List<String> columns;
  private final boolean isStrict;
  private final char fieldDelimiter;
  private final char quoteCharacter;
  private final Character escapeCharacter;
  private PhoenixHeaderSource headerSource = PhoenixHeaderSource.FROM_TABLE;
  private final CSVFormat format;

  private final String arrayElementSeparator;

  public enum PhoenixHeaderSource {
    FROM_TABLE,
    IN_LINE,
    SUPPLIED_BY_USER
  }

  public CSVCommonsLoader(PhoenixConnection conn, String tableName, List<String> columns,
    boolean isStrict) {
    this(conn, tableName, columns, isStrict, ',', '"', null, DEFAULT_ARRAY_ELEMENT_SEPARATOR);
  }

  public CSVCommonsLoader(PhoenixConnection conn, String tableName, List<String> columns,
    boolean isStrict, char fieldDelimiter, char quoteCharacter, Character escapeCharacter,
    String arrayElementSeparator) {
    this.conn = conn;
    this.tableName = tableName;
    this.columns = columns;
    this.isStrict = isStrict;
    this.fieldDelimiter = fieldDelimiter;
    this.quoteCharacter = quoteCharacter;
    this.escapeCharacter = escapeCharacter;

    // implicit in the columns value.
    if (columns != null && !columns.isEmpty()) {
      headerSource = PhoenixHeaderSource.SUPPLIED_BY_USER;
    } else if (columns != null && columns.isEmpty()) {
      headerSource = PhoenixHeaderSource.IN_LINE;
    }

    this.arrayElementSeparator = arrayElementSeparator;
    this.format = buildFormat();
  }

  public CSVFormat getFormat() {
    return format;
  }

  /**
   * default settings delimiter = ',' quoteChar = '"', escape = null recordSeparator = CRLF, CR, or
   * LF ignore empty lines allows the last data line to have a recordSeparator
   * @return CSVFormat based on constructor settings.
   */
  private CSVFormat buildFormat() {
    CSVFormat format =
      CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(asControlCharacter(fieldDelimiter))
        .withQuote(asControlCharacter(quoteCharacter));

    if (escapeCharacter != null) {
      format = format.withEscape(asControlCharacter(escapeCharacter));
    }

    switch (headerSource) {
      case FROM_TABLE:
        // obtain headers from table, so format should not expect a header.
        break;
      case IN_LINE:
        // an empty string array triggers csv loader to grab the first line as the header
        format = format.withHeader(new String[0]);
        break;
      case SUPPLIED_BY_USER:
        // a populated string array supplied by the user
        format = format.withHeader(columns.toArray(new String[columns.size()]));
        break;
      default:
        throw new RuntimeException("Header source was unable to be inferred.");

    }
    return format;
  }

  /**
   * Translate a field separator, escape character, or phrase delimiter into a control character if
   * it is a single digit other than 0.
   */
  public static char asControlCharacter(char delimiter) {
    return CTRL_CHARACTER_TABLE.getOrDefault(delimiter, delimiter);
  }

  /**
   * Upserts data from CSV file. Data is batched up based on connection batch size. Column PDataType
   * is read from metadata and is used to convert column value to correct type before upsert. The
   * constructor determines the format for the CSV files.
   */
  public void upsert(String fileName) throws Exception {
    CSVParser parser = CSVParser.parse(new File(fileName), Charsets.UTF_8, format);
    upsert(parser);
  }

  public void upsert(Reader reader) throws Exception {
    CSVParser parser = new CSVParser(reader, format);
    upsert(parser);
  }

  private static <T> String buildStringFromList(List<T> list) {
    return Joiner.on(", ").useForNull("null").join(list);
  }

  /**
   * Data is batched up based on connection batch size. Column PDataType is read from metadata and
   * is used to convert column value to correct type before upsert. The format is determined by the
   * supplied csvParser. CSVParser instance
   */
  public void upsert(CSVParser csvParser) throws Exception {
    List<ColumnInfo> columnInfoList = buildColumnInfoList(csvParser);

    boolean wasAutoCommit = conn.getAutoCommit();
    try {
      conn.setAutoCommit(false);
      long start = EnvironmentEdgeManager.currentTimeMillis();
      CsvUpsertListener upsertListener =
        new CsvUpsertListener(conn, conn.getMutateBatchSize(), isStrict);
      CsvUpsertExecutor csvUpsertExecutor =
        new CsvUpsertExecutor(conn, SchemaUtil.getEscapedFullTableName(tableName), columnInfoList,
          upsertListener, arrayElementSeparator);

      csvUpsertExecutor.execute(csvParser);
      csvUpsertExecutor.close();

      conn.commit();
      double elapsedDuration = ((EnvironmentEdgeManager.currentTimeMillis() - start) / 1000.0);
      System.out
        .println("CSV Upsert complete. " + upsertListener.getTotalUpsertCount() + " rows upserted");
      System.out.println("Time: " + elapsedDuration + " sec(s)\n");

    } finally {

      // release reader resources.
      if (csvParser != null) {
        csvParser.close();
      }
      if (wasAutoCommit) {
        conn.setAutoCommit(true);
      }
    }
  }

  private List<ColumnInfo> buildColumnInfoList(CSVParser parser) throws SQLException {
    List<String> columns = this.columns;
    switch (headerSource) {
      case FROM_TABLE:
        System.out.println(String.format("csv columns from database."));
        break;
      case IN_LINE:
        columns = new ArrayList<>(parser.getHeaderMap().keySet());
        System.out.println(String.format("csv columns from header line. length=%s, %s",
          columns.size(), buildStringFromList(columns)));
        break;
      case SUPPLIED_BY_USER:
        System.out.println(String.format("csv columns from user. length=%s, %s", columns.size(),
          buildStringFromList(columns)));
        break;
      default:
        throw new IllegalStateException("parser has unknown column source.");
    }
    return SchemaUtil.generateColumnInfo(conn, tableName, columns, isStrict);
  }

  static class CsvUpsertListener implements UpsertExecutor.UpsertListener<CSVRecord> {

    private final PhoenixConnection conn;
    private final int upsertBatchSize;
    private long totalUpserts = 0L;
    private final boolean strict;

    CsvUpsertListener(PhoenixConnection conn, int upsertBatchSize, boolean strict) {
      this.conn = conn;
      this.upsertBatchSize = upsertBatchSize;
      this.strict = strict;
    }

    @Override
    public void upsertDone(long upsertCount) {
      totalUpserts = upsertCount;
      if (upsertCount % upsertBatchSize == 0) {
        if (upsertCount % 1000 == 0) {
          LOGGER.info("Processed upsert #{}", upsertCount);
        }
        try {
          LOGGER.info("Committing after {} records", upsertCount);
          conn.commit();
        } catch (SQLException e) {
          throw new RuntimeException(e);
        }
      }
    }

    @Override
    public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) {
      LOGGER.error("Error upserting record " + csvRecord, throwable);
      if (strict) {
        Throwables.propagate(throwable);
      }
    }

    /**
     * Get the total number of upserts that this listener has been notified about up until now.
     * @return the total count of upserts
     */
    public long getTotalUpsertCount() {
      return totalUpserts;
    }
  }
}
